Source code for hysop.iterative_method

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""Description of an iterative method (iteration, stopping criteria, ...)"""

from hysop import Simulation, Problem
from hysop.parameters.scalar_parameter import ScalarParameter
from hysop.tools.contexts import Timer
from hysop import dprint, vprint
from hysop.tools.decorators import debug, profile
from hysop.core.graph.graph import ready
from hysop.constants import HYSOP_REAL, HYSOP_INTEGER
from hysop.tools.numpywrappers import npw
from hysop.core.mpi import main_rank, main_size, main_comm
import numpy as np
import datetime


[docs] class PseudoSimulation(Simulation): """Pseudo time-iterations for iterative method""" def __init__(self, stop_criteria, tolerance=1e-8, **kwds): """ Parameters ---------- stop_criteria : TensorParameter Iterative loop test for stopping is stop_criteria<tolerance tolerance : float (optional) Tolerance for stopping criteria (default to 1e-8) Notes ----- This object implement an iterative method in a pseudo-time inside a 'real' time interval. It can be used as sub-iterations for a fixed-point operator. It implements a loop defined as follows: .. code:: while(t<end and iteration<max_iter and max(stop_criteria)>tolerance) """ super().__init__(**kwds) self.stop_criteria, self.tolerance = stop_criteria, tolerance
[docs] def advance(self, dbg=None, plot_freq=10): """Proceed to next time. * Advance time and iteration number. * Compute the new timestep * check if simulation is over. """ super().advance(dbg=dbg, plot_freq=plot_freq) if np.max(self.stop_criteria.value) <= self.tolerance: self.is_over = True return
[docs] def print_state(self, verbose=None): """Print current iteration parameters""" if isinstance(self.stop_criteria, ScalarParameter): crit = f"{self.stop_criteria.value:.8g}" else: crit = np.array2string( self.stop_criteria.value, formatter={"float_kind": lambda x: f"{x:.8g}"} ) msg = "=== PseudoSimulation : {0:6d}, criteria = {1} ==" if verbose: print(msg.format(self.current_iteration, crit)) else: vprint(msg.format(self.current_iteration, crit))
[docs] class IterativeMethod(Problem): """Overriding a Problem to enfoce a PseudoSimulation for iterative method loop. The PseudoSimulation is created on each 'apply'. There is no meaning for the underlying pseudo-time. Notes ----- Sub-timestepping is not a usual use-case for this Problem. One should override Problem class in a proper way. Here only a pseudo-timestep is used together with a maximal number of iteration to compute a pseudo-final time. """ def __new__( cls, stop_criteria, tolerance=1e-8, state_print=100, max_iter=10000, dt0=None, dt=None, configsimu=None, **kwargs, ): return super().__new__(cls, **kwargs) def __init__( self, stop_criteria, tolerance=1e-8, state_print=100, max_iter=10000, dt0=None, dt=None, configsimu=None, **kwargs, ): super().__init__(**kwargs) self.stop_criteria, self.tolerance = stop_criteria, tolerance # create a pseudo-time step parameter if not given. if dt is None: dt = ScalarParameter( name="pseudo-dt", dtype=HYSOP_REAL, min_value=np.finfo(HYSOP_REAL).eps, initial_value=np.finfo(HYSOP_REAL).eps, quiet=True, ) else: dt.value = np.finfo(HYSOP_REAL).eps self.dt0, self.dt = dt0, dt self.state_print = state_print self.max_iter = max_iter # Store the iteration number of each call to 'apply' in a parameter. self.it_num = ScalarParameter( name="iterations", dtype=HYSOP_INTEGER, initial_value=0, quiet=True ) # stop_criteria reset value self._stop_criteria_reset = npw.ones( self.stop_criteria.shape, dtype=self.stop_criteria.dtype ) self._stop_criteria_reset[...] = 1e10 self.configsimu = self._default_configsimu if not configsimu is None: self.configsimu = configsimu def _default_configsimu(self, l, s): pass
[docs] @debug @ready def apply(self, simulation, report_freq=0, dbg=None, **kwds): if self.to_be_skipped(self, simulation=simulation, **kwds): return self.run_iterations( simulation=simulation, report_freq=report_freq, dbg=dbg, **kwds )
[docs] @profile def run_iterations(self, simulation, report_freq=0, dbg=None, **kwds): """This function si meant to clarify the profiling data""" vprint("=== Entering iterative method...") self.stop_criteria.value = self._stop_criteria_reset # Initialize the pseudo-simulation for iterative loop # end time is computed from pseudo-time step and a maximal number of iterations. loop = PseudoSimulation( stop_criteria=self.stop_criteria, tolerance=self.tolerance, start=simulation.t(), end=simulation.t() + self.max_iter * self.dt0, max_iter=self.max_iter, dt0=self.dt0, dt=self.dt, t=ScalarParameter(name="dummy-t", dtype=self.dt.dtype, quiet=True), mpi_params=self.mpi_params, ) self.configsimu(loop, simulation) # Usual initialize-loop-finalize sequence : if not loop._is_ready: loop.initialize() with Timer() as tm: while not loop.is_over: if loop.current_iteration % self.state_print == 0: loop.print_state() super().apply(simulation=loop, dbg=dbg, **kwds) loop.advance(dbg=dbg) if report_freq > 0 and (loop.current_iteration % report_freq) == 0: self.profiler_report() avg_time = self.mpi_params.comm.allreduce(tm.interval) / self.mpi_params.size msg = "=== Leaving iterative method ({0} iterations, criteria = {1} in {2} ({3}s) {4})" vprint( msg.format( loop.current_iteration, np.array2string( loop.stop_criteria.value, formatter={"float_kind": lambda x: f"{x:8.8f}"}, ), datetime.timedelta(seconds=round(avg_time)), avg_time, ( "" if self.mpi_params.size == 1 else f", averaged over {self.mpi_params.size} ranks. " ), ) ) self.it_num.value = loop.current_iteration loop.finalize() self.final_report()
[docs] def get_preserved_input_fields(self): return set()
[docs] def final_report(self): pass